1 /**
2 * Copyright 2014 Netflix, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package rx.internal.util;
17
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.atomic.AtomicIntegerArray;
20 import java.util.concurrent.atomic.AtomicReference;
21 import java.util.concurrent.atomic.AtomicReferenceArray;
22
23 import rx.Subscription;
24 import rx.functions.Func1;
25
26 /**
27 * Add/Remove without object allocation (after initial construction).
28 * <p>
29 * This is meant for hundreds or single-digit thousands of elements that need
30 * to be rapidly added and randomly or sequentially removed while avoiding object allocation.
31 * <p>
32 * On Intel Core i7, 2.3Mhz, Mac Java 8:
33 * <p>
34 * - adds per second single-threaded => ~32,598,500 for 100
35 * - adds per second single-threaded => ~23,200,000 for 10,000
36 * - adds + removes per second single-threaded => 15,562,100 for 100
37 * - adds + removes per second single-threaded => 8,760,000 for 10,000
38 *
39 * <pre> {@code
40 * Benchmark (size) Mode Samples Score Score error Units
41 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263571.721 9856.994 ops/s
42 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1763.417 211.998 ops/s
43 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139850.115 17143.705 ops/s
44 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 809.982 72.931 ops/s
45 * } </pre>
46 *
47 * @param <E>
48 */
49 public final class IndexedRingBuffer<E> implements Subscription {
50
51 private static final ObjectPool<IndexedRingBuffer<?>> POOL = new ObjectPool<IndexedRingBuffer<?>>() {
52
53 @Override
54 protected IndexedRingBuffer<?> createObject() {
55 return new IndexedRingBuffer<Object>();
56 }
57
58 };
59
60 @SuppressWarnings("unchecked")
61 public final static <T> IndexedRingBuffer<T> getInstance() {
62 return (IndexedRingBuffer<T>) POOL.borrowObject();
63 }
64
65 private final ElementSection<E> elements = new ElementSection<E>();
66 private final IndexSection removed = new IndexSection();
67 /* package for unit testing */final AtomicInteger index = new AtomicInteger();
68 /* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger();
69
70 // default size of ring buffer
71 /**
72 * Set at 256 ... Android defaults far smaller which likely will never hit the use cases that require the higher buffers.
73 * <p>
74 * The 10000 size test represents something that should be a rare use case (merging 10000 concurrent Observables for example)
75 *
76 * <pre> {@code
77 * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*IndexedRingBufferPerf.*'
78 *
79 * 1024
80 *
81 * Benchmark (size) Mode Samples Score Score error Units
82 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 269292.006 6013.347 ops/s
83 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 2217.103 163.396 ops/s
84 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139349.608 9397.232 ops/s
85 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 1045.323 30.991 ops/s
86 *
87 * 512
88 *
89 * Benchmark (size) Mode Samples Score Score error Units
90 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 270919.870 5381.793 ops/s
91 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1724.436 42.287 ops/s
92 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 141478.813 3696.030 ops/s
93 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 719.447 75.629 ops/s
94 *
95 *
96 * 256
97 *
98 * Benchmark (size) Mode Samples Score Score error Units
99 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 272042.605 7954.982 ops/s
100 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1101.329 23.566 ops/s
101 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 140479.804 6389.060 ops/s
102 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 397.306 24.222 ops/s
103 *
104 * 128
105 *
106 * Benchmark (size) Mode Samples Score Score error Units
107 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263065.312 11168.941 ops/s
108 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 581.708 17.397 ops/s
109 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 138051.488 4618.935 ops/s
110 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 176.873 35.669 ops/s
111 *
112 * 32
113 *
114 * Benchmark (size) Mode Samples Score Score error Units
115 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 250737.473 17260.148 ops/s
116 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 144.725 26.284 ops/s
117 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 118832.832 9082.658 ops/s
118 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 32.133 8.048 ops/s
119 *
120 * 8
121 *
122 * Benchmark (size) Mode Samples Score Score error Units
123 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 209192.847 25558.124 ops/s
124 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 26.520 3.100 ops/s
125 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 100200.463 1854.259 ops/s
126 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 8.456 2.114 ops/s
127 *
128 * 2
129 *
130 * Benchmark (size) Mode Samples Score Score error Units
131 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 96549.208 4427.239 ops/s
132 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 6.637 2.025 ops/s
133 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 34553.169 4904.197 ops/s
134 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 2.159 0.700 ops/s
135 * } </pre>
136 *
137 * Impact of IndexedRingBuffer size on merge
138 *
139 * <pre> {@code
140 * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
141 *
142 * 512
143 *
144 * Benchmark (size) Mode Samples Score Score error Units
145 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5282500.038 530541.761 ops/s
146 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 49327.272 6382.189 ops/s
147 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.025 4.724 ops/s
148 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 97395.148 2489.303 ops/s
149 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.723 1.479 ops/s
150 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4534067.250 116321.725 ops/s
151 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 458561.098 27652.081 ops/s
152 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 43267.381 2648.107 ops/s
153 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5581051.672 144191.849 ops/s
154 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.643 4.354 ops/s
155 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76437.644 959.748 ops/s
156 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2965.306 272.928 ops/s
157 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5026522.098 364196.255 ops/s
158 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34926.819 938.612 ops/s
159 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 33.342 1.701 ops/s
160 *
161 *
162 * 128
163 *
164 * Benchmark (size) Mode Samples Score Score error Units
165 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5144891.776 271990.561 ops/s
166 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 53580.161 2370.204 ops/s
167 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.265 2.236 ops/s
168 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96634.426 1417.430 ops/s
169 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.648 0.255 ops/s
170 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4601280.220 53157.938 ops/s
171 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 463394.568 58612.882 ops/s
172 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50503.565 2394.168 ops/s
173 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5490315.842 228654.817 ops/s
174 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.661 3.385 ops/s
175 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 74716.169 7413.642 ops/s
176 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3009.476 277.075 ops/s
177 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4953313.642 307512.126 ops/s
178 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 35335.579 2368.377 ops/s
179 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 37.450 0.655 ops/s
180 *
181 * 32
182 *
183 * Benchmark (size) Mode Samples Score Score error Units
184 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4975957.497 365423.694 ops/s
185 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 52141.226 5056.658 ops/s
186 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.663 2.671 ops/s
187 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96507.893 1833.371 ops/s
188 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.850 0.782 ops/s
189 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4557128.302 118516.934 ops/s
190 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 339005.037 10594.737 ops/s
191 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50781.535 6071.787 ops/s
192 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5604920.068 209285.840 ops/s
193 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.413 7.496 ops/s
194 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76098.942 558.187 ops/s
195 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2988.137 193.255 ops/s
196 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5177255.256 150253.086 ops/s
197 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34772.490 909.967 ops/s
198 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 34.847 0.606 ops/s
199 *
200 * 8
201 *
202 * Benchmark (size) Mode Samples Score Score error Units
203 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5027331.903 337986.410 ops/s
204 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51746.540 3585.450 ops/s
205 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 52.682 4.026 ops/s
206 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96805.587 2868.112 ops/s
207 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.598 0.290 ops/s
208 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4390912.630 300687.310 ops/s
209 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 458615.731 56125.958 ops/s
210 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 49033.105 6132.936 ops/s
211 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5090614.100 649439.778 ops/s
212 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 48.548 3.586 ops/s
213 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 72285.482 16820.952 ops/s
214 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2981.576 316.140 ops/s
215 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4993609.293 267975.397 ops/s
216 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 33228.972 1554.924 ops/s
217 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 32.994 3.615 ops/s
218 *
219 *
220 * 2
221 *
222 * Benchmark (size) Mode Samples Score Score error Units
223 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5103812.234 939461.192 ops/s
224 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51491.116 3790.056 ops/s
225 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 54.043 2.340 ops/s
226 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96575.834 13416.541 ops/s
227 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.740 0.047 ops/s
228 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4435909.832 899133.671 ops/s
229 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 392382.445 59814.783 ops/s
230 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50429.258 7489.849 ops/s
231 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5637321.803 161838.195 ops/s
232 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 51.065 2.138 ops/s
233 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76366.764 2631.710 ops/s
234 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2978.302 296.418 ops/s
235 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5280829.290 1602542.493 ops/s
236 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 35070.518 3565.672 ops/s
237 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 34.501 0.991 ops/s
238 *
239 * } </pre>
240 */
241 static int _size = 256;
242 static {
243 // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
244 if (PlatformDependent.isAndroid()) {
245 _size = 8;
246 }
247
248 // possible system property for overriding
249 String sizeFromProperty = System.getProperty("rx.indexed-ring-buffer.size"); // also see RxRingBuffer
250 if (sizeFromProperty != null) {
251 try {
252 _size = Integer.parseInt(sizeFromProperty);
253 } catch (Exception e) {
254 System.err.println("Failed to set 'rx.indexed-ring-buffer.size' with value " + sizeFromProperty + " => " + e.getMessage());
255 }
256 }
257 }
258
259 /* package for unit testing */static final int SIZE = _size;
260
261 /**
262 * This resets the arrays, nulls out references and returns it to the pool.
263 * This extra CPU cost is far smaller than the object allocation cost of not pooling.
264 */
265 public void releaseToPool() {
266 // need to clear all elements so we don't leak memory
267 int maxIndex = index.get();
268 int realIndex = 0;
269 ElementSection<E> section = elements;
270 outer: while (section != null) {
271 for (int i = 0; i < SIZE; i++, realIndex++) {
272 if (realIndex >= maxIndex) {
273 section = null;
274 break outer;
275 }
276 // we can use lazySet here because we are nulling things out and not accessing them again
277 // (relative on Mac Intel i7) lazySet gets us ~30m vs ~26m ops/second in the JMH test (100 adds per release)
278 section.array.set(i, null);
279 }
280 section = section.next.get();
281 }
282
283 index.set(0);
284 removedIndex.set(0);
285 POOL.returnObject(this);
286 }
287
288 @Override
289 public void unsubscribe() {
290 releaseToPool();
291 }
292
293 private IndexedRingBuffer() {
294 }
295
296 /**
297 * Add an element and return the index where it was added to allow removal.
298 *
299 * @param e
300 * @return
301 */
302 public int add(E e) {
303 int i = getIndexForAdd();
304 if (i < SIZE) {
305 // fast-path when we are in the first section
306 elements.array.set(i, e);
307 return i;
308 } else {
309 int sectionIndex = i % SIZE;
310 getElementSection(i).array.set(sectionIndex, e);
311 return i;
312 }
313 }
314
315 public E remove(int index) {
316 E e;
317 if (index < SIZE) {
318 // fast-path when we are in the first section
319 e = elements.array.getAndSet(index, null);
320 } else {
321 int sectionIndex = index % SIZE;
322 e = getElementSection(index).array.getAndSet(sectionIndex, null);
323 }
324 pushRemovedIndex(index);
325 return e;
326 }
327
328 private IndexSection getIndexSection(int index) {
329 // short-cut the normal case
330 if (index < SIZE) {
331 return removed;
332 }
333
334 // if we have passed the first array we get more complicated and do recursive chaining
335 int numSections = index / SIZE;
336 IndexSection a = removed;
337 for (int i = 0; i < numSections; i++) {
338 a = a.getNext();
339 }
340 return a;
341 }
342
343 private ElementSection<E> getElementSection(int index) {
344 // short-cut the normal case
345 if (index < SIZE) {
346 return elements;
347 }
348
349 // if we have passed the first array we get more complicated and do recursive chaining
350 int numSections = index / SIZE;
351 ElementSection<E> a = elements;
352 for (int i = 0; i < numSections; i++) {
353 a = a.getNext();
354 }
355 return a;
356 }
357
358 private synchronized int getIndexForAdd() {
359 /*
360 * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
361 */
362 int i;
363 int ri = getIndexFromPreviouslyRemoved();
364 if (ri >= 0) {
365 if (ri < SIZE) {
366 // fast-path when we are in the first section
367 i = removed.getAndSet(ri, -1);
368 } else {
369 int sectionIndex = ri % SIZE;
370 i = getIndexSection(ri).getAndSet(sectionIndex, -1);
371 }
372 if (i == index.get()) {
373 // if it was the last index removed, when we pick it up again we want to increment
374 index.getAndIncrement();
375 }
376 } else {
377 i = index.getAndIncrement();
378 }
379 return i;
380 }
381
382 /**
383 * Returns -1 if nothing, 0 or greater if the index should be used
384 *
385 * @return
386 */
387 private synchronized int getIndexFromPreviouslyRemoved() {
388 /*
389 * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
390 */
391
392 // loop because of CAS
393 while (true) {
394 int currentRi = removedIndex.get();
395 if (currentRi > 0) {
396 // claim it
397 if (removedIndex.compareAndSet(currentRi, currentRi - 1)) {
398 return currentRi - 1;
399 }
400 } else {
401 // do nothing
402 return -1;
403 }
404 }
405 }
406
407 private synchronized void pushRemovedIndex(int elementIndex) {
408 /*
409 * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
410 */
411
412 int i = removedIndex.getAndIncrement();
413 if (i < SIZE) {
414 // fast-path when we are in the first section
415 removed.set(i, elementIndex);
416 } else {
417 int sectionIndex = i % SIZE;
418 getIndexSection(i).set(sectionIndex, elementIndex);
419 }
420 }
421
422 @Override
423 public boolean isUnsubscribed() {
424 return false;
425 }
426
427 public int forEach(Func1<? super E, Boolean> action) {
428 return forEach(action, 0);
429 }
430
431 /**
432 *
433 * @param action
434 * that processes each item and returns true if it wants to continue to the next
435 * @return int of next index to process, or last index seen if it exited early
436 */
437 public int forEach(Func1<? super E, Boolean> action, int startIndex) {
438 int endedAt = forEach(action, startIndex, index.get());
439 if (startIndex > 0 && endedAt == index.get()) {
440 // start at the beginning again and go up to startIndex
441 endedAt = forEach(action, 0, startIndex);
442 } else if (endedAt == index.get()) {
443 // start back at the beginning
444 endedAt = 0;
445 }
446 return endedAt;
447 }
448
449 private int forEach(Func1<? super E, Boolean> action, int startIndex, int endIndex) {
450 int lastIndex = startIndex;
451 int maxIndex = index.get();
452 int realIndex = startIndex;
453 ElementSection<E> section = elements;
454
455 if (startIndex >= SIZE) {
456 // move into the correct section
457 section = getElementSection(startIndex);
458 startIndex = startIndex % SIZE;
459 }
460
461 outer: while (section != null) {
462 for (int i = startIndex; i < SIZE; i++, realIndex++) {
463 if (realIndex >= maxIndex || realIndex >= endIndex) {
464 section = null;
465 break outer;
466 }
467 E element = section.array.get(i);
468 if (element == null) {
469 continue;
470 }
471 lastIndex = realIndex;
472 boolean continueLoop = action.call(element);
473 if (!continueLoop) {
474 return lastIndex;
475 }
476 }
477 section = section.next.get();
478 startIndex = 0; // reset to start for next section
479 }
480
481 // return the OutOfBounds index position if we processed all of them ... the one we should be less-than
482 return realIndex;
483 }
484
485 private static class ElementSection<E> {
486 private final AtomicReferenceArray<E> array = new AtomicReferenceArray<E>(SIZE);
487 private final AtomicReference<ElementSection<E>> next = new AtomicReference<ElementSection<E>>();
488
489 ElementSection<E> getNext() {
490 if (next.get() != null) {
491 return next.get();
492 } else {
493 ElementSection<E> newSection = new ElementSection<E>();
494 if (next.compareAndSet(null, newSection)) {
495 // we won
496 return newSection;
497 } else {
498 // we lost so get the value that won
499 return next.get();
500 }
501 }
502 }
503 }
504
505 private static class IndexSection {
506
507 private final AtomicIntegerArray unsafeArray = new AtomicIntegerArray(SIZE);
508
509 public int getAndSet(int expected, int newValue) {
510 return unsafeArray.getAndSet(expected, newValue);
511 }
512
513 public void set(int i, int elementIndex) {
514 unsafeArray.set(i, elementIndex);
515 }
516
517 private final AtomicReference<IndexSection> _next = new AtomicReference<IndexSection>();
518
519 IndexSection getNext() {
520 if (_next.get() != null) {
521 return _next.get();
522 } else {
523 IndexSection newSection = new IndexSection();
524 if (_next.compareAndSet(null, newSection)) {
525 // we won
526 return newSection;
527 } else {
528 // we lost so get the value that won
529 return _next.get();
530 }
531 }
532 }
533
534 }
535
536 }